package org.yun.octopus.mongo.async;

import org.springframework.data.mongodb.core.BulkOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.yun.octopus.base.core.AsyncHelper;
import org.yun.octopus.base.core.MarkCmd;
import org.yun.octopus.base.util.CommonUtil;
import org.yun.octopus.base.util.SystemTimer;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @ProjectName: octopus
 * @ClassName: MondoAsyncMark
 * @Description: mongo的批量打标
 * @Author: liyunfeng31
 * @Date: 2021/2/2 23:05
 */
@Component
public class MongoAsyncMark extends AsyncHelper {

    @Resource
    private MongoTemplate mongoTemplate;


    @Override
    public void batchSave() throws InterruptedException {
        List<MarkCmd> cmdList = new ArrayList<>();
        CommonUtil.drain(QUEUE, cmdList, CACHE_SIZE, 1, TimeUnit.SECONDS);
        if(CollectionUtils.isEmpty(cmdList)){
            return;
        }
        BulkOperations ops = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, "mark");
        for (MarkCmd cmd : cmdList) {
            Query query = new Query(Criteria.where("bizNo").is(cmd.getBizNo()));
            Update update = new Update();
            int stepNo = cmd.getStepNo();
            String childNo = cmd.getChildNo();
            update.set("stepNo"+stepNo, stepNo)
                    .set("time", SystemTimer.currentTime())
                    .set("group",cmd.getGroup());
            if(childNo != null){
                update.set(stepNo+"No",childNo);
            }
            ops.upsert(query, update);
        }
        ops.execute();
    }
}
